perf(hash-join): eliminate intermediate array allocations in probe-side collision filter#23209
perf(hash-join): eliminate intermediate array allocations in probe-side collision filter#23209LiaCastaneda wants to merge 4 commits into
Conversation
Replace `equal_rows_arr` (Arrow take + eq_dyn_null + FilterBuilder, O(matched_pairs) allocations) with an in-place `JoinKeyComparator` loop. On collision-free build sides — detected once at build time by scanning the `next` chain for adjacent pairs with distinct keys — skip the per-pair recheck entirely: probe rows form consecutive runs in the output buffer, so we check the chain head once and accept/reject the whole run. This cuts key comparisons from F (fanout) per probe row down to 1 on uniform-key build sides, producing a 2.4× speedup on high-fanout string-key joins (Q23, SF100: 1.01s → 0.42s join_time). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ccfebda to
06503c2
Compare
06503c2 to
e3b8172
Compare
| /// Example — keys `["cat", "cat", "dog"]`, next `[0, 1, 2]`: | ||
| /// row 1 → prev 0: "cat"=="cat" ✓ | ||
| /// row 2 → prev 1: "dog"!="cat" → return true (collision found) | ||
| fn detect_key_collisions<T>( |
There was a problem hiding this comment.
I checked in the other engine (Trino) and there when a build row is inserted, Trino only inserts rows that share the same key, so by construction, every chain is already "pure" and hash collisions are not possible.
This happens because Trino uses a hash table that resolves key equality at insert time. DataFusion's update_from_iter only receives hashes and row indices (it has no access to key values) so it chains all rows in the same hash bucket together regardless of whether they share a key or not.
There was a problem hiding this comment.
Note doing the same in DF would be non trivial because it would require modifying update_from_iter trait signature to accept key columns alongside the hashes, and updating all call sites accordingly.
There was a problem hiding this comment.
Yes I think this could be faster at least for single column keys / with type / join type specialization.
The reason I think we / I pursued this design is that take is (much) faster than individual dispatch (and most recent literature also seems to implement it in a separate pass) and probably easier to implement all the join types.
Also combining it with filter is a bit easier with the indices -> filter indices -> collision check, etc.
c3b077f to
d647868
Compare
|
run benchmark tpch10 tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf/hash-join-no-take-in-probe (d647868) to bde8e5b (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf/hash-join-no-take-in-probe (d647868) to bde8e5b (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
It looks it is mostly faster but also regresses on some |
|
run benchmark tpch10 tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf/hash-join-no-take-in-probe (d647868) to bde8e5b (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing perf/hash-join-no-take-in-probe (d647868) to bde8e5b (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
When HashJoinExec builds the probe the hash table, it collects all candidate (build_idx, probe_idx) pairs and then rechecks every pair with
equal_rows_arr(shows in the profile, taking 20% of the query CPU) to filter out hash collisions. This recheck materializes intermediate Arrow arrays sized to the number of matched pairs (take+eq_dyn_null+FilterBuilder), making it O(matched_pairs) in both allocations and comparisons.For high-fanout joins (many build rows per distinct key), matched_pairs = probe_rows × fanout. At fanout 78× over 2.3M probe rows this produces ~176M pairs — and the recheck runs on all of them, even though nearly all pass.
This cost is most visible when:
What changes are included in this PR?
Step 1: Replace
equal_rows_arrwith an in-placeJoinKeyComparatorloop , eliminating the O(matched_pairs) Arrow allocations in the fallback path.Step 2: At build time, scan the next chain once to detect whether the map is collision-free (all adjacent linked pairs share the same key). If so, the probe phase checks once per run of consecutive same-probe-idx pairs and accepts the entire run.
Are these changes tested?
Existing tests pass, I also added some tests for the new function that detects hash collisions
detect_key_collisionsI did some profiling on the added Q23 (this query replicates the level of skew where just 1 partition does the entire join):
We can see the CPU for the HashJoin was cut in half.
Also the benchmarks results show Q23 runs ~x2.5 times faster:
Are there any user-facing changes?
No